-
Notifications
You must be signed in to change notification settings - Fork 845
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Optimize batch span processor #2983
Optimize batch span processor #2983
Conversation
sdk/trace/build.gradle.kts
Outdated
@@ -45,6 +45,7 @@ dependencies { | |||
jmh("io.grpc:grpc-api") | |||
jmh("io.grpc:grpc-netty-shaded") | |||
jmh("org.testcontainers:testcontainers") // testContainer for OTLP collector | |||
implementation("org.jctools:jctools-core:3.2.0") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We do not want to take additional external dependencies like this for our SDK.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we want to use this, we'll need to shade it into our project, rather than have it be exposed as a transitive dependency.
0ad85d7
to
046066d
Compare
Can you include the benchmarks, before and after in this PR description, rather than just in the middle of a very long discussion on the related issue? |
046066d
to
8c2471b
Compare
import java.util.logging.Level; | ||
import java.util.logging.Logger; | ||
import org.jctools.queues.MpscArrayQueue; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this better than disruptor
? Can we run the same benchmark for https://github.com/open-telemetry/opentelemetry-java/tree/main/sdk-extensions/async-processor ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@bogdandrutu Is there an existing jmh benchmark with DisruptorAsyncSpanProcessor?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unfortunately not, but I think you can copy-paste the same one that you have and only initialize that SpanProcessor.
Can you also include the |
This is something I couldn't run locally (using docker-desktop on mac). I might be missing some setup required for jmh. Can you run this through the BatchSpanProcessorBenchmark and post the results? |
@@ -92,6 +92,7 @@ val DEPENDENCIES = listOf( | |||
"org.awaitility:awaitility:4.0.3", | |||
"org.codehaus.mojo:animal-sniffer-annotations:1.20", | |||
"org.curioswitch.curiostack:protobuf-jackson:1.2.0", | |||
"org.jctools:jctools-core:3.2.0", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From looking the benchmarks, I think making the general change of signaling instead of polling makes sense to me. Let's not add this dependency in this PR though, we'll need to figure out how to vendor in only the MpscQueue
to keep the size down if we find it to be significant (it seems like we might be but let's think about it separately)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, lets get the MpscQueue in a different PR. @anuraaga Any help would be great !
sdk/trace/src/main/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessor.java
Outdated
Show resolved
Hide resolved
if (flushRequested.compareAndSet(null, flushResult)) { | ||
lock.lock(); | ||
try { | ||
needExport.signal(); | ||
} finally { | ||
lock.unlock(); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
now that you grab the lock, I think it will be a big simplification if the flushRequested is protected by the lock.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could you clarify what would it simplify? Are you saying we can get rid of the AtomicReference? Then the exporter thread then have to take locks inside flush() as well as checking the flushRequested flag inside the lock.
I don't understand the benchmark
And of course forceFlush() is a major cost in that benchmark which we don't expect any improvements from this change. |
It only does a I have no idea why we have that duplicated benchmark. It should probably be deleted. |
Still it does do a forceFlush() on every iteration of the export(). The benchmark is not testing real scenarios.
|
One |
private final Collection<SpanData> batch; | ||
private final ReentrantLock lock; | ||
public AtomicLong droppedSpansCounter = new AtomicLong(0); | ||
public AtomicLong exportedSpansCounter = new AtomicLong(0); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a temporary add to show the benchmark results as I couldn't get the metric to work correctly.
Added a benchmark with different thread configuration. Ran the BatchSpanProcessor under different versions. The benchmark clearly shows the overhead of locks/signal/polling.
All the benchmarks use the following configuration:
|
Is a 0ms exporter delay a "real-life" configuration? Sure, if export is free, then we can improve the BSP CPU usage, but if export takes any time at all, is the BSP CPU usage swamped by the export, in which case these tweaks are of limited value? |
I think you are missing the difference between a throughput benchmark and a cpu one. As mentioned in my previous comment, 0ms is meant for throughput benchmark and clearly shows the lock/signal/polling overhead. It also shows that forceFlush() impact on benchnmark. As mentioned in my previous comment that I am following up with a cpu benchmark. This reflects "real world" scenario. JMH doesn't support measuring CPU time of exporter thread. So one has to use a profiler, in my case I used YJP Raw Data of CPU (user + kernel) time in msChartThroughput stays the same |
So, if I read your graphs correctly,
Is the best option that doesn't involve us depending on a 3rd party library. Do you have a PR with that implementation? Does it also do time-based exports so it will work correctly in low-throughput situations? |
Yes, it is java's inbuilt ConcurrentLinkedQueue. I have a local branch that use the ConcurrentLinkedQueue and yes it does time based export. We can go with this approach for now and pull in MPSCQueue later since it makes the implementation really clean and most efficient. I can update this PR if there is consensus |
@@ -33,13 +33,13 @@ | |||
@State(Scope.Benchmark) | |||
public class BatchSpanProcessorBenchmark { | |||
|
|||
private static class DelayingSpanExporter implements SpanExporter { | |||
public static class DelayingSpanExporter implements SpanExporter { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if we want to re-use this, please pull it up out of this class to the top level, so it's clear it's a reusable component for benchmarking.
I understand all of what you've said. Why are you opposed to having this as a separate incubator implementation for others to try out before we make it into the main default implementation? |
Also please note that throughput alone is a bad metric in the benchmark. One has to really look at the exportedSpans. For example I can write code that simply returns without adding to the queue and the throughput would be super high. So the existing BatchSpanProcessorBenchmark is even more meaningless TBH |
If you feel so strongly that our benchmark is meaningless (btw, please watch your language and be aware that there are human beings who wrote this code), then let's do something else. Please put in a PR that makes the benchmarks meaningful as a separate PR from also changing the implementations. |
I am in no way talking about people who contributed to the code. We really appreciate all the work of the people who contributed here. I am only pointing out that the existing benchmark is not helpful for multiple reasons.
And we have contributed two benchmarks that addresses the above issues. I will look into the incubator implementation, thanks for pointing it up. |
So, it will be very helpful to have the work separated. Let's have the benchmarks updated to be better, and have general agreement from the maintainers that it is better and more useful. Let's run the existing implementation against them once that is merged. Then, as a separate step, let's have a PR that changes the implementation and shows the change in the output of the benchmarks. |
Separate PR for benchmarks https://github.com/open-telemetry/opentelemetry-java/pull/3017/files |
|
||
private Worker( | ||
SpanExporter spanExporter, | ||
long scheduleDelayNanos, | ||
int maxExportBatchSize, | ||
long exporterTimeoutNanos, | ||
BlockingQueue<ReadableSpan> queue) { | ||
ConcurrentLinkedQueue<ReadableSpan> queue, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm trying to get up to speed on thread - did you compare with ArrayBlockingQueue
? I would be surprised if the linked list, with the allocations it needs to do would perform better and anyways we want to avoid the complicated counter increment. There is almost no work in the blocking section so even with contention it should not really put threads to sleep I guess.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm particularly worried about the allocations since more garbage means unrelated app code ends up being affected by increased GC
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could you pease elaborate on what you meant by compare with ArrayBlockingQueue
? My benchmarks took ArrayBlockingQueue
the current implementation as the baseline and this approach showed clear improvement on throughput and CPU. Do you mean using ArrayBlockingQueue
with batched signal? If so, yes I did ran the benchmark but it is much less throughput than using concurrent queue though there is good CPU overhead improvements.
Regarding the GC, yes there would be some increased allocations. So ideally we should be pulling in MPSCQueue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you mean using ArrayBlockingQueue with batched signal? I
Yeah I meant this - I don't see it in the graphs, it would be nice to see the numbers to know what the real effect is. If it means simplifying the code (meaning not keeping track of the span count, for example) with a small difference, then it's still worth it especially given we may try to replace with MPSCQueue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here are the benchmark results. Throughput suffers with BlockingQueue, but exporter CPU overhead is less with batched signal. ConcurrentQueue is better overall though.
I don't see any significant gc.alloc.rate increase. The increase is well within the error boundary.
With blocking queue batch signal
With concurrent queue batch signal
if (batch.size() >= maxExportBatchSize || System.nanoTime() >= nextExportTime) { | ||
exportCurrentBatch(); | ||
updateNextExportTime(); | ||
} | ||
if (queue.isEmpty()) { | ||
lock.lock(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we use a blocking queue for the signalling too? I think a one element queue is mostly equivalent but means we don't have to manage the locks ourselves, simplifying the code drastically.
4e94955
to
c102d8f
Compare
} else { | ||
queue.offer(span); | ||
queueSize.incrementAndGet(); | ||
if (addedSpansCounter.incrementAndGet() % maxExportBatchSize == 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jkwatson This reminds me of an issue I filed a long long time ago
open-telemetry/opentelemetry-specification#849
Never got clarification on whether we're supposed to eagerly export when we have maxExportBatchSize items. But it's true that what are current code is doing so we should preserve it for now which I think is the intent of this line.
droppedSpans.add(1); | ||
} else { | ||
queue.offer(span); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not comfortable with the thread-safety issues of this, it seems the queue can easily go over maxQueueSize which is an invariant we need to preserve. Since ArrayBlockingQueue
is a queue with max capacity, let's just use it, it's defined for our job here - throughput with 0ms delay is sort of interesting theoretically but not an actual use case for this class, we can enjoy the CPU improvements when idle though thanks to the signalling which is what matters.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed queue can go over maxQueueSize constraint temporarily. How about optimistically trying to insert into queue and backing off if maxQueueSize is reached?
if (size.incrementAndGet() >= maxQueueSize) {
size.decrementAndGet();
return false;
}
queue.offer(span);
return true
concurrentQueue does have better behavior under contention with reduced exporter cpu overhead and writer thread overhead.
If MPSCQueue is the way forward as an extension, then I feel we are good with a blocking queue and batch signaling. Is it realistic to assume a batch span processor based on MPSCQueue could be supported soon?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@sbandadd Discussed a bit with @jkwatson and let's go ahead and split this into two parts in this order
- Replacing constant polling with signaling
- Improve concurrency of the queuing
We don't need to bundle them into a single PR. So can we update this one to use signaling but stick with ArrayBlockingQueue
, which you've demonstrated does improve CPU significantly? And in a followup we can address concurrency, most likely by vendoring in MPSCQueue
as I don't see that being a big problem.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure thing. This sounds like a solid plan to me. Thanks for reaching a consensus !
@sbandadd would it be possible for you to attend one of our SIG meetings to discuss this all in more detail? Or jump onto CNCF slack and discuss in more real-time? |
Sure, I can join the SIG meetings. I joined the CNCF opentelemtety channel as well. Thanks ! |
c102d8f
to
f47c1dc
Compare
while (!queue.isEmpty() && batch.size() < maxExportBatchSize) { | ||
batch.add(queue.poll().toSpanData()); | ||
} | ||
|
||
if (batch.size() >= maxExportBatchSize || System.nanoTime() >= nextExportTime) { | ||
exportCurrentBatch(); | ||
updateNextExportTime(); | ||
} | ||
if (queue.isEmpty()) { | ||
try { | ||
long pollWaitTime = nextExportTime - System.nanoTime(); | ||
if (pollWaitTime > 0) { | ||
signal.poll(pollWaitTime, TimeUnit.NANOSECONDS); | ||
} | ||
} catch (InterruptedException e) { | ||
Thread.currentThread().interrupt(); | ||
return; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this produce the same result using drain? Then I think it lets us use queue.size() >= maxExportBatchSize()
in addSpan
instead of maintaining a counter.
while(queue.size() >= maxExportBatchSize)) {
batch.clear(); // Can extract method for these three lines.
queue.drainTo(batch);
export(batch);
}
if (!queue.isEmpty() && System.nanoTime() >= nextExportTime) {
batch.clear();
queue.drainTo(batch);
export(batch);
updateNextExportTime();
} else {
long pollWaitTime = nextExportTime - System.nanoTime();
if (pollWaitTime > 0) {
signal.poll(pollWaitTime, TimeUnit.NANOSECONDS);
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess the race would go away anyways if removing signal.clear()
from that suggestion - the worst is just that it evaluates the while
and if
one more time before sleeping.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The logic looks really complicated imho. Repeatedly copying queue into the batch is bit confusing as well as the race. Using a counter we are essentially amortizing the cost of signal, it is intuitive to think that a signal every maxExportBatchSize essentially implies the exporter thread when receiving the signal is guaranteed to do the export.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In you suggestion, isn't export(batch) already clearing the batch? Also in the if condition !queue.isEmpty() && System.nanoTime() >= nextExportTime
, it is exporting the whole batch at once instead of in maxExportBatchSize chunks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it is intuitive to think that a signal every maxExportBatchSize essentially implies the exporter thread when receiving the signal is guaranteed to do the export.
But can't the signal be missed, since it's already exporting or something like that?
In you suggestion, isn't export(batch) already clearing the batch?
Yeah probably, it's somewhat pseudocode
Also in the if condition !queue.isEmpty() && System.nanoTime() >= nextExportTime, it is exporting the whole batch at once instead of in maxExportBatchSize chunks.
We've already exported maxExportBatchSize chunks. However, every time the interval passed, we are supposed to clear the queue fully, so this is where remaining spans get exported - it won't export more than maxExportBatchSize even if the queue suddenly had many spans added, because drainTo
won't fill more than batch.size()
it's ok.
I think the loop, then if pattern here actually captures the complexity more directly. We have two conditions
- Eagerly export as many spans as possible when over maxExportBatchSize
- If interval has passed, make sure to export entire queue
So these are handled with two conditionals any time the thread wakes up, since there are actually two conditions. Does it make sense?
Let's not worry about the race, if we never clear the signal queue we're good and I think that will work well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The current code handles sending chunks with size less than maxExportBatchSize when interval is passed. I am not sure what becomes unclear here?
If I'm not mistaken, then if a chunk is sent that isn't maxExportBatchSize
then the %
becomes out of sync. The intention of this %
is to send chunks with size maxExportBatchSize
but if any batch is sent with a different size due to the interval check, then when the signal is sent, we will have less or more than maxExportBatchSize
in an unclear way. For example if batch size is 5, and 4 spans are sent due to the interval check, then the signal will happen with only 1 span in the queue. I don't think that's our intention with that signal.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I updated the PR to just use an atomic boolean and queue size. I think it will make things much more clear though I didn't see any change in benchmark results.
drainTo() holds up queue lock for a long time which negatively impacts the writer threads.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like this latest iteration quite a bit. Unfortunately, it looks like BatchSpanProcessorTest.forceExport()
is quite flaky with this implementation. It failed about 1/5 times for me locally. I'm not sure what's going on.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jkwatson could you paste the error? I ran the test 10 times and couldn't reproduce locally.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It just fails the first assertion with a '0', rather than a '49'.
Description: Batch span processor currently is aggressive in the sense that any new spans are sent to the exporter, this involves lots of overhead from signaling under heavy load and overhead from constant polling by exporter thread under less load. This PR makes exporter thread wait for maxExportBatchSize to avoid busy polling of the queue. BatchSpanProcessorMultiThreadBenchmark.java result ![image](https://user-images.githubusercontent.com/62265954/111420486-893c7300-86a8-11eb-8f87-feb2f86f00fc.png) BatchSpanProcessorCpuBenchmark.java result ![image](https://user-images.githubusercontent.com/62265954/111420492-8e012700-86a8-11eb-800e-7de1fbe2c2b1.png)
f47c1dc
to
5ac7737
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems like a relatively small change with a significant improvement in signaling strength. Thanks!
sdk/trace/src/main/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessor.java
Outdated
Show resolved
Hide resolved
37d2863
to
556210e
Compare
556210e
to
216fa44
Compare
I made a minor update to use a AtomicInteger instead of AtomicLong. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the hard work on this!
Thanks @sbandadd! |
Description:
Batch span processor currently is aggressive in the sense that any new spans are sent to the exporter,
this involves lots of overhead from signaling under heavy load and overhead from constant polling by exporter thread
under less load. This PR makes exporter thread wait for maxExportBatchSize to avoid busy polling of the queue.
Benchmark results
BatchSpanProcessorMultiThreadBenchmark.java result
BatchSpanProcessorCpuBenchmark.java result